package com.xzx.flink.streamapi.watermark;

import com.xzx.flink.bean.ClickEvent;
import com.xzx.flink.streamapi.source.ClickSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

/**
 * @version 1.0
 * @auther xinzhixuan
 * @date 2022/4/30 13:58
 */
public class Watermark_BoundedOutOfOrderness {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.
                                <ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner(new SerializableTimestampAssigner<ClickEvent>() {
                                    @Override
                                    public long extractTimestamp(ClickEvent element, long recordTimestamp) {
                                        return element.getTimestamp();
                                    }
                                })
                )
                .print();

        env.execute(Watermark_BoundedOutOfOrderness.class.getSimpleName());
    }
}
